package com.xiaofan.apitest.tabletest

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}

object TableApiTest {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    // 1. 创建环境
    // 1.1. 基于old planner的流处理
    /**
     * val oldStreamSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
     * .useOldPlanner()
     * .inStreamingMode()
     * .build()
     *
     * val oldStreamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, oldStreamSettings)
     *
     * // 1.2. 基于老版本的批处理
     * val batchEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     * val oldBatchTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(batchEnv)
     *
     * // 1.3. 基于blink planner的流处理
     * val blinkStreamSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
     * .useBlinkPlanner()
     * .inStreamingMode()
     * .build()
     *
     * val blinkStreamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, blinkStreamSettings)
     *
     * // 1.4. 基于blink planner的批处理
     * val blinkBatchSettings: EnvironmentSettings = EnvironmentSettings.newInstance()
     * .useBlinkPlanner()
     * .inBatchMode()
     * .build()
     *
     * val blinkBatchTableEnv: TableEnvironment = TableEnvironment.create(blinkBatchSettings)
     */
    // 2. 链接外部系统， 读取数据，注册表
    // 2.1. 读取文件
    val filePath = "D:\\big-data\\code\\FlinkTutorial\\src\\main\\resources\\sensor.txt"

    tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("inputTable")

    // val dataTable: Table = tableEnv.from("inputTable")

    // tableEnv.toAppendStream[(String, Long, Double)](dataTable).print()

    // 2.2. 读取kafka
    /*tableEnv.connect(new Kafka()
      .version("0.11")
      .topic("sensor")
      .property("bootstrap.servers", "192.168.1.23:9091,192.168.1.24:9091,192.168.1.25:9091")
    )
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("kafkaInputTable")

    val kafkaResultTable: Table = tableEnv.from("kafkaInputTable")

    tableEnv.toAppendStream[(String, Long, Double)](kafkaResultTable).print()*/


    // 3. 转换
    // 3.1. 使用table api
//    val sensorTable: Table = tableEnv.from("inputTable")
//    val resultTable: Table = sensorTable
//      .select($"id", $"timestamp", $"temperature")
//      .filter($"id" === "sensor_1")
//
//    tableEnv.toAppendStream[(String,Long,Double)](resultTable).print("table result")

    // 注意：timestamp特殊字段
    val sqlResultTable: Table = tableEnv.sqlQuery(
      """
        |select id,temperature
        |from inputTable
        |where id = 'sensor_1'
        |""".stripMargin)


    tableEnv.toAppendStream[(String,Double)](sqlResultTable).print("sql result")

    env.execute("table test")
  }

}


















